#!/usr/bin/env python
#
# Test cases for the QMP 'x-blockdev-del' command
#
# Copyright (C) 2015 Igalia, S.L.
# Author: Alberto Garcia <berto@igalia.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#

import os
import iotests
import time

base_img = os.path.join(iotests.test_dir, 'base.img')
new_img = os.path.join(iotests.test_dir, 'new.img')

class TestBlockdevDel(iotests.QMPTestCase):

    def setUp(self):
        iotests.qemu_img('create', '-f', iotests.imgfmt, base_img, '1M')
        self.vm = iotests.VM()
        self.vm.launch()

    def tearDown(self):
        self.vm.shutdown()
        os.remove(base_img)
        if os.path.isfile(new_img):
            os.remove(new_img)

    # Check whether a BlockBackend exists
    def checkBlockBackend(self, backend, node, must_exist = True):
        result = self.vm.qmp('query-block')
        backends = filter(lambda x: x['device'] == backend, result['return'])
        self.assertLessEqual(len(backends), 1)
        self.assertEqual(must_exist, len(backends) == 1)
        if must_exist:
            if node:
                self.assertEqual(backends[0]['inserted']['node-name'], node)
            else:
                self.assertFalse(backends[0].has_key('inserted'))

    # Check whether a BlockDriverState exists
    def checkBlockDriverState(self, node, must_exist = True):
        result = self.vm.qmp('query-named-block-nodes')
        nodes = filter(lambda x: x['node-name'] == node, result['return'])
        self.assertLessEqual(len(nodes), 1)
        self.assertEqual(must_exist, len(nodes) == 1)

    # Add a new BlockBackend (with its attached BlockDriverState)
    def addBlockBackend(self, backend, node):
        file_node = '%s_file' % node
        self.checkBlockBackend(backend, node, False)
        self.checkBlockDriverState(node, False)
        self.checkBlockDriverState(file_node, False)
        opts = {'driver': iotests.imgfmt,
                'id': backend,
                'node-name': node,
                'file': {'driver': 'file',
                         'node-name': file_node,
                         'filename': base_img}}
        result = self.vm.qmp('blockdev-add', conv_keys = False, options = opts)
        self.assert_qmp(result, 'return', {})
        self.checkBlockBackend(backend, node)
        self.checkBlockDriverState(node)
        self.checkBlockDriverState(file_node)

    # Add a BlockDriverState without a BlockBackend
    def addBlockDriverState(self, node):
        file_node = '%s_file' % node
        self.checkBlockDriverState(node, False)
        self.checkBlockDriverState(file_node, False)
        opts = {'driver': iotests.imgfmt,
                'node-name': node,
                'file': {'driver': 'file',
                         'node-name': file_node,
                         'filename': base_img}}
        result = self.vm.qmp('blockdev-add', conv_keys = False, options = opts)
        self.assert_qmp(result, 'return', {})
        self.checkBlockDriverState(node)
        self.checkBlockDriverState(file_node)

    # Add a BlockDriverState that will be used as overlay for the base_img BDS
    def addBlockDriverStateOverlay(self, node):
        self.checkBlockDriverState(node, False)
        iotests.qemu_img('create', '-f', iotests.imgfmt,
                         '-b', base_img, new_img, '1M')
        opts = {'driver': iotests.imgfmt,
                'node-name': node,
                'backing': '',
                'file': {'driver': 'file',
                         'filename': new_img}}
        result = self.vm.qmp('blockdev-add', conv_keys = False, options = opts)
        self.assert_qmp(result, 'return', {})
        self.checkBlockDriverState(node)

    # Delete a BlockBackend
    def delBlockBackend(self, backend, node, expect_error = False,
                        destroys_media = True):
        self.checkBlockBackend(backend, node)
        if node:
            self.checkBlockDriverState(node)
        result = self.vm.qmp('x-blockdev-del', id = backend)
        if expect_error:
            self.assert_qmp(result, 'error/class', 'GenericError')
            if node:
                self.checkBlockDriverState(node)
        else:
            self.assert_qmp(result, 'return', {})
            if node:
                self.checkBlockDriverState(node, not destroys_media)
        self.checkBlockBackend(backend, node, must_exist = expect_error)

    # Delete a BlockDriverState
    def delBlockDriverState(self, node, expect_error = False):
        self.checkBlockDriverState(node)
        result = self.vm.qmp('x-blockdev-del', node_name = node)
        if expect_error:
            self.assert_qmp(result, 'error/class', 'GenericError')
        else:
            self.assert_qmp(result, 'return', {})
        self.checkBlockDriverState(node, expect_error)

    # Add a device model
    def addDeviceModel(self, device, backend):
        result = self.vm.qmp('device_add', id = device,
                             driver = 'virtio-blk-pci', drive = backend)
        self.assert_qmp(result, 'return', {})

    # Delete a device model
    def delDeviceModel(self, device):
        result = self.vm.qmp('device_del', id = device)
        self.assert_qmp(result, 'return', {})

        result = self.vm.qmp('system_reset')
        self.assert_qmp(result, 'return', {})

        device_path = '/machine/peripheral/%s/virtio-backend' % device
        event = self.vm.event_wait(name="DEVICE_DELETED",
                                   match={'data': {'path': device_path}})
        self.assertNotEqual(event, None)

        event = self.vm.event_wait(name="DEVICE_DELETED",
                                   match={'data': {'device': device}})
        self.assertNotEqual(event, None)

    # Remove a BlockDriverState
    def ejectDrive(self, backend, node, expect_error = False,
                   destroys_media = True):
        self.checkBlockBackend(backend, node)
        self.checkBlockDriverState(node)
        result = self.vm.qmp('eject', device = backend)
        if expect_error:
            self.assert_qmp(result, 'error/class', 'GenericError')
            self.checkBlockDriverState(node)
            self.checkBlockBackend(backend, node)
        else:
            self.assert_qmp(result, 'return', {})
            self.checkBlockDriverState(node, not destroys_media)
            self.checkBlockBackend(backend, None)

    # Insert a BlockDriverState
    def insertDrive(self, backend, node):
        self.checkBlockBackend(backend, None)
        self.checkBlockDriverState(node)
        result = self.vm.qmp('x-blockdev-insert-medium',
                             device = backend, node_name = node)
        self.assert_qmp(result, 'return', {})
        self.checkBlockBackend(backend, node)
        self.checkBlockDriverState(node)

    # Create a snapshot using 'blockdev-snapshot-sync'
    def createSnapshotSync(self, node, overlay):
        self.checkBlockDriverState(node)
        self.checkBlockDriverState(overlay, False)
        opts = {'node-name': node,
                'snapshot-file': new_img,
                'snapshot-node-name': overlay,
                'format': iotests.imgfmt}
        result = self.vm.qmp('blockdev-snapshot-sync', conv_keys=False, **opts)
        self.assert_qmp(result, 'return', {})
        self.checkBlockDriverState(node)
        self.checkBlockDriverState(overlay)

    # Create a snapshot using 'blockdev-snapshot'
    def createSnapshot(self, node, overlay):
        self.checkBlockDriverState(node)
        self.checkBlockDriverState(overlay)
        result = self.vm.qmp('blockdev-snapshot',
                             node = node, overlay = overlay)
        self.assert_qmp(result, 'return', {})
        self.checkBlockDriverState(node)
        self.checkBlockDriverState(overlay)

    # Create a mirror
    def createMirror(self, backend, node, new_node):
        self.checkBlockBackend(backend, node)
        self.checkBlockDriverState(new_node, False)
        opts = {'device': backend,
                'target': new_img,
                'node-name': new_node,
                'sync': 'top',
                'format': iotests.imgfmt}
        result = self.vm.qmp('drive-mirror', conv_keys=False, **opts)
        self.assert_qmp(result, 'return', {})
        self.checkBlockBackend(backend, node)
        self.checkBlockDriverState(new_node)

    # Complete an existing block job
    def completeBlockJob(self, backend, node_before, node_after):
        self.checkBlockBackend(backend, node_before)
        result = self.vm.qmp('block-job-complete', device=backend)
        self.assert_qmp(result, 'return', {})
        self.wait_until_completed(backend)
        self.checkBlockBackend(backend, node_after)

    # Add a BlkDebug node
    # Note that the purpose of this is to test the x-blockdev-del
    # sanity checks, not to create a usable blkdebug drive
    def addBlkDebug(self, debug, node):
        self.checkBlockDriverState(node, False)
        self.checkBlockDriverState(debug, False)
        image = {'driver': iotests.imgfmt,
                 'node-name': node,
                 'file': {'driver': 'file',
                          'filename': base_img}}
        opts = {'driver': 'blkdebug',
                'node-name': debug,
                'image': image}
        result = self.vm.qmp('blockdev-add', conv_keys = False, options = opts)
        self.assert_qmp(result, 'return', {})
        self.checkBlockDriverState(node)
        self.checkBlockDriverState(debug)

    # Add a BlkVerify node
    # Note that the purpose of this is to test the x-blockdev-del
    # sanity checks, not to create a usable blkverify drive
    def addBlkVerify(self, blkverify, test, raw):
        self.checkBlockDriverState(test, False)
        self.checkBlockDriverState(raw, False)
        self.checkBlockDriverState(blkverify, False)
        iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M')
        node_0 = {'driver': iotests.imgfmt,
                  'node-name': test,
                  'file': {'driver': 'file',
                           'filename': base_img}}
        node_1 = {'driver': iotests.imgfmt,
                  'node-name': raw,
                  'file': {'driver': 'file',
                           'filename': new_img}}
        opts = {'driver': 'blkverify',
                'node-name': blkverify,
                'test': node_0,
                'raw': node_1}
        result = self.vm.qmp('blockdev-add', conv_keys = False, options = opts)
        self.assert_qmp(result, 'return', {})
        self.checkBlockDriverState(test)
        self.checkBlockDriverState(raw)
        self.checkBlockDriverState(blkverify)

    # Add a Quorum node
    def addQuorum(self, quorum, child0, child1):
        self.checkBlockDriverState(child0, False)
        self.checkBlockDriverState(child1, False)
        self.checkBlockDriverState(quorum, False)
        iotests.qemu_img('create', '-f', iotests.imgfmt, new_img, '1M')
        child_0 = {'driver': iotests.imgfmt,
                   'node-name': child0,
                   'file': {'driver': 'file',
                            'filename': base_img}}
        child_1 = {'driver': iotests.imgfmt,
                   'node-name': child1,
                   'file': {'driver': 'file',
                            'filename': new_img}}
        opts = {'driver': 'quorum',
                'node-name': quorum,
                'vote-threshold': 1,
                'children': [ child_0, child_1 ]}
        result = self.vm.qmp('blockdev-add', conv_keys = False, options = opts)
        self.assert_qmp(result, 'return', {})
        self.checkBlockDriverState(child0)
        self.checkBlockDriverState(child1)
        self.checkBlockDriverState(quorum)

    ########################
    # The tests start here #
    ########################

    def testWrongParameters(self):
        self.addBlockBackend('drive0', 'node0')
        result = self.vm.qmp('x-blockdev-del')
        self.assert_qmp(result, 'error/class', 'GenericError')
        result = self.vm.qmp('x-blockdev-del', id='drive0', node_name='node0')
        self.assert_qmp(result, 'error/class', 'GenericError')
        self.delBlockBackend('drive0', 'node0')

    def testBlockBackend(self):
        self.addBlockBackend('drive0', 'node0')
        # You cannot delete a BDS that is attached to a backend
        self.delBlockDriverState('node0', expect_error = True)
        self.delBlockBackend('drive0', 'node0')

    def testBlockDriverState(self):
        self.addBlockDriverState('node0')
        # You cannot delete a file BDS directly
        self.delBlockDriverState('node0_file', expect_error = True)
        self.delBlockDriverState('node0')

    def testEject(self):
        self.addBlockBackend('drive0', 'node0')
        self.ejectDrive('drive0', 'node0')
        self.delBlockBackend('drive0', None)

    def testDeviceModel(self):
        self.addBlockBackend('drive0', 'node0')
        self.addDeviceModel('device0', 'drive0')
        self.ejectDrive('drive0', 'node0', expect_error = True)
        self.delBlockBackend('drive0', 'node0', expect_error = True)
        self.delDeviceModel('device0')
        self.delBlockBackend('drive0', 'node0')

    def testAttachMedia(self):
        # This creates a BlockBackend and removes its media
        self.addBlockBackend('drive0', 'node0')
        self.ejectDrive('drive0', 'node0')
        # This creates a new BlockDriverState and inserts it into the backend
        self.addBlockDriverState('node1')
        self.insertDrive('drive0', 'node1')
        # The backend can't be removed: the new BDS has an extra reference
        self.delBlockBackend('drive0', 'node1', expect_error = True)
        self.delBlockDriverState('node1', expect_error = True)
        # The BDS still exists after being ejected, but now it can be removed
        self.ejectDrive('drive0', 'node1', destroys_media = False)
        self.delBlockDriverState('node1')
        self.delBlockBackend('drive0', None)

    def testSnapshotSync(self):
        self.addBlockBackend('drive0', 'node0')
        self.createSnapshotSync('node0', 'overlay0')
        # This fails because node0 is now being used as a backing image
        self.delBlockDriverState('node0', expect_error = True)
        # This succeeds because overlay0 only has the backend reference
        self.delBlockBackend('drive0', 'overlay0')
        self.checkBlockDriverState('node0', False)

    def testSnapshot(self):
        self.addBlockBackend('drive0', 'node0')
        self.addBlockDriverStateOverlay('overlay0')
        self.createSnapshot('node0', 'overlay0')
        self.delBlockBackend('drive0', 'overlay0', expect_error = True)
        self.delBlockDriverState('node0', expect_error = True)
        self.delBlockDriverState('overlay0', expect_error = True)
        self.ejectDrive('drive0', 'overlay0', destroys_media = False)
        self.delBlockBackend('drive0', None)
        self.delBlockDriverState('node0', expect_error = True)
        self.delBlockDriverState('overlay0')
        self.checkBlockDriverState('node0', False)

    def testMirror(self):
        self.addBlockBackend('drive0', 'node0')
        self.createMirror('drive0', 'node0', 'mirror0')
        # The block job prevents removing the device
        self.delBlockBackend('drive0', 'node0', expect_error = True)
        self.delBlockDriverState('node0', expect_error = True)
        self.delBlockDriverState('mirror0', expect_error = True)
        self.wait_ready('drive0')
        self.completeBlockJob('drive0', 'node0', 'mirror0')
        self.assert_no_active_block_jobs()
        self.checkBlockDriverState('node0', False)
        # This succeeds because the backend now points to mirror0
        self.delBlockBackend('drive0', 'mirror0')

    def testBlkDebug(self):
        self.addBlkDebug('debug0', 'node0')
        # 'node0' is used by the blkdebug node
        self.delBlockDriverState('node0', expect_error = True)
        # But we can remove the blkdebug node directly
        self.delBlockDriverState('debug0')
        self.checkBlockDriverState('node0', False)

    def testBlkVerify(self):
        self.addBlkVerify('verify0', 'node0', 'node1')
        # We cannot remove the children of a blkverify device
        self.delBlockDriverState('node0', expect_error = True)
        self.delBlockDriverState('node1', expect_error = True)
        # But we can remove the blkverify node directly
        self.delBlockDriverState('verify0')
        self.checkBlockDriverState('node0', False)
        self.checkBlockDriverState('node1', False)

    def testQuorum(self):
        if not 'quorum' in iotests.qemu_img_pipe('--help'):
            return
        self.addQuorum('quorum0', 'node0', 'node1')
        # We cannot remove the children of a Quorum device
        self.delBlockDriverState('node0', expect_error = True)
        self.delBlockDriverState('node1', expect_error = True)
        # But we can remove the Quorum node directly
        self.delBlockDriverState('quorum0')
        self.checkBlockDriverState('node0', False)
        self.checkBlockDriverState('node1', False)


if __name__ == '__main__':
    iotests.main(supported_fmts=["qcow2"])
